-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys #42306
Conversation
ccaa7a7
to
80a1ecd
Compare
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
Outdated
Show resolved
Hide resolved
// Support only when all cluster key have an associated partition expression key | ||
requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) && | ||
// and if all partition expression contain only a single partition key. | ||
expressions.forall(_.collectLeaves().size == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm why this condition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was to fix a test, I couldn't find it back to be honest. There was a test somewhere that was trying this case (which isnt actually supported in the code currently), and I think asserting the right exception, which I think would break if SPJ is activated. I could revert this and see again to find the test, if you want.
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
Outdated
Show resolved
Hide resolved
groupSplits = true).get | ||
// In the case where we replicate partitions, we have grouped | ||
// the partitions by the join key if they differ | ||
val groupByExpressions = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we override KeyGroupedPartitioning
method in this class, and wrap the logic of handling join keys in the method? We can return a new KeyGroupedPartitioning
instance whose expressions
, partitionValues
are "projected" on the join keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, changed outputPartitioning to return KeyGroupedPartitoning to reflect that.
node.mapChildren(child => populatePartitionValues( | ||
child, values, applyPartialClustering, replicatePartitions)) | ||
node.mapChildren(child => populateStoragePartitionJoinParams( | ||
child, values, partitionGroupByPositions, applyPartialClustering, replicatePartitions)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of populating partitionGroupByPositions
, can we populate StoragePartitionJoinParams.keyGroupedPartitioning
instead? which can be the subset of expressions that participate in the join.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will need some more guidance on this one.
059824b
to
62fa5dd
Compare
|
||
object KeyGroupedShuffleSpec { | ||
|
||
def isExpressionCompatible(left: Expression, right: Expression): Boolean = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just grouping the new static into companion object, so the diff looks a bit bigger, let me know if I should revert
62fa5dd
to
8319e43
Compare
Could you re-trigger the failed pipeline or rebase this PR to the |
Hi @dongjoon-hyun , I think @sunchao had another idea he is thinking about, was going to wait a bit for that to update the pr |
Oh, got it! |
048701b
to
dac34ec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @szehon-ho ! Looks great with a few comments.
if (SQLConf.get.getConf( | ||
SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)) { | ||
requiredClustering.exists(x => attributes.exists(_.semanticEquals(x))) && | ||
expressions.forall(_.collectLeaves().size == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this deserves some comments since otherwise it's a bit confusing why we need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comment, please check if it makes sense
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
Outdated
Show resolved
Hide resolved
if (SQLConf.get.getConf( | ||
SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)) { | ||
requiredClustering.forall(x => attributes.exists(_.semanticEquals(x))) && | ||
expressions.forall(_.collectLeaves().size == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be guaranteed currently - it might be better to have this invariant check somewhere else like when constructing a KeyGroupedPartitioning
, but OK to leave it here for now
@@ -674,7 +711,8 @@ case class HashShuffleSpec( | |||
|
|||
case class KeyGroupedShuffleSpec( | |||
partitioning: KeyGroupedPartitioning, | |||
distribution: ClusteredDistribution) extends ShuffleSpec { | |||
distribution: ClusteredDistribution, | |||
joinKeyPositions: Option[Seq[Int]] = None) extends ShuffleSpec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can add some comments for KeyGroupedShuffleSpec
to explain what is this for, otherwise it's a bit hard to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments, please check and suggest if it can be improved.
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
Outdated
Show resolved
Hide resolved
fe4920d
to
3fd3a7b
Compare
@sunchao thanks! addressed review comments |
…keys ### What changes were proposed in this pull request? - Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled - Change key compatibility checks in EnsureRequirements. Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled) - "Project" partitions by join keys in KeyGroupedPartitioning/KeyGroupedShuffleSpec - Add join key grouping to the partition grouping in BatchScanExec ### Why are the changes needed? - Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? -Added tests in KeyGroupedPartitioningSuite -Because of apache#37886 we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered. Need to see how to relax this in separate PR.
3fd3a7b
to
0df6e97
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
69235f9
to
a62e32b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Merged to mater for Apache Spark 4.0.0. Thank you so much, @szehon-ho and @sunchao ! |
Thanks @szehon-ho @dongjoon-hyun ! |
Hi @dongjoon-hyun @sunchao, |
Apache Spark has a back-porting policy which allows only bug fixes, @irsath . Given that this PR is an improvement, we are unable to touch the release branches like |
Right, sorry for the typo but I meant: what if we make a 3.6 with this PR ? I never contributed to OSS spark but if your ok with the idea I can try to do a PR in that regard. |
…keys - Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled - Change key compatibility checks in EnsureRequirements. Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled) - Change BatchScanExec/DataSourceV2Relation to group splits by join keys if they differ from partition keys (previously grouped only by partition values). Do same for all auxiliary data structure, like commonPartValues. - Implement partiallyClustered skew-handling. - Group only the replicate side (now by join key as well), replicate by the total size of other-side partitions that share the join key. - add an additional sort for partitions based on join key, as when we group the replicate side, partition ordering becomes out of order from the non-replicate side. - Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them No -Added tests in KeyGroupedPartitioningSuite -Found two existing problems, will address in separate PR: - Because of apache#37886 we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered. Need to see how to relax this. - https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change. This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist. Hopefully this will also get fixed in another way. Closes apache#42306 from szehon-ho/spj_attempt_master. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…keys - Add new conf spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled - Change key compatibility checks in EnsureRequirements. Remove checks where all partition keys must be in join keys to allow isKeyCompatible = true in this case (if this flag is enabled) - Change BatchScanExec/DataSourceV2Relation to group splits by join keys if they differ from partition keys (previously grouped only by partition values). Do same for all auxiliary data structure, like commonPartValues. - Implement partiallyClustered skew-handling. - Group only the replicate side (now by join key as well), replicate by the total size of other-side partitions that share the join key. - add an additional sort for partitions based on join key, as when we group the replicate side, partition ordering becomes out of order from the non-replicate side. - Support Storage Partition Join in cases where the join condition does not contain all the partition keys, but just some of them No -Added tests in KeyGroupedPartitioningSuite -Found two existing problems, will address in separate PR: - Because of apache#37886 we have to select all join keys to trigger SPJ in this case, otherwise DSV2 scan does not report KeyGroupedPartitioning and SPJ does not get triggered. Need to see how to relax this. - https://issues.apache.org/jira/browse/SPARK-44641 was found when testing this change. This pr refactors some of those code to add group-by-join-key, but doesnt change the underlying logic, so issue continues to exist. Hopefully this will also get fixed in another way. Closes apache#42306 from szehon-ho/spj_attempt_master. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
Why are the changes needed?
Does this PR introduce any user-facing change?
No
How was this patch tested?
-Added tests in KeyGroupedPartitioningSuite
-Found two existing problems, will address in separate PR: